Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for dynamic pipelines #1480

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft

Proposal for dynamic pipelines #1480

wants to merge 16 commits into from

Conversation

ptodev
Copy link
Contributor

@ptodev ptodev commented Aug 15, 2024

PR Description

This is a proposal for starting up Alloy pipelines dynamically based on data in transit. For example, if a discovery component comes up with 10 targets, Alloy can start 10 sub-pipelines, each dealing with a different target.

I'm not sure if "dynamic pipelines" is a good name. I'm open to suggestions for a better one :)

It'd be great to come up with more use cases for this, so that we can invent a solution that fits as many use cases as possible.

Which issue(s) this PR fixes

Related to #1443
However, to fix the issue above we'd have to also come up with an implementation.

We are proposing a new feature to the [Alloy standard library][stdlib].
It will be similar to a `map` operation over a collection such as a `list()`.
Each `map` transformation will be done by a chain of components (a "sub-pipeline") created for this transformation.
Each item in the collection will be processed by a different "sub-pipeline".
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how feasible this is when the number of items in the collection is really big. Should there be a limit to how many sub-pipelines there are?

Comment on lines +240 to +244
* What about debug metrics? Should we aggregate the metrics for all "sub-pipelines"?
* If there is 1 series for each sub-pipeline, the amount of metrics could be huge.
Some service discovery mechanisms may generate a huge number of elements in a list of targets.
* If we want to aggregate the metrics, how would we do that? Is it even possible to do in within Alloy?
* Can we have a configuration parameter which dictates whether the metrics should be aggregated or not?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could become a huge blocker. I don't think we want a new series for each sub-pipeline, and I am not sure if we can avoid it.


// Every component defined in the "foreach" block will be instantiated for each item in the collection.
// The instantiated components will be scoped using the name of the foreach block and the index of the
// item in the collection. For example: /foreach.redis/0/prometheus.exporter.redis.default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/foreach.redis/0/prometheus.exporter.redis.default

Targets can come and go between discovery intervals. If you use the index to scope them you will need to shift it on every update. Maybe you can just use the target itself which should be unique: /foreach.redis/target/prometheus.exporter.redis.default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can just use the target itself which should be unique

What do you mean by "the target itself"? Do you mean the "address" label? We're not guaranteed to have such a label on the collection which is being iterated on :) The collection might not be targets. Also, some targets might not follow this convention of having an __address__ label.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the target (or more generally the item in the collection) can be hashed and we can use the hash for scoping and for avoid recomputations on update when a new collection comes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think targets going away in the middle of the list is a good point to consider. Hashing the way that William is proposing might work and we'd need to recommend people to try change the input targets as little as possible (e.g. use a reasonable refresh interval), otherwise there will be a lot of churn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, couldn't there be hash collisions? I don't see how hashing is a very resilient way of handling this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With sufficiently long hash you don't need to worry about collisions. Unless there are implementation bugs.

If collisions worry you or we want to use shorter IDs than a 512bit hash, we could think of a mapper that will assign stable IDs to targets, using a simple int sequence.

* If we want to aggregate the metrics, how would we do that? Is it even possible to do in within Alloy?
* Can we have a configuration parameter which dictates whether the metrics should be aggregated or not?
* Do we have to recreate the sub-pipelines every time a new collection is received,
even if the new collection has the same number of elements?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you identify each sub-pipeline by its corresponding target, it should be ok to only add/remove sub-pipelines according to the new set of targets and keep the sub-pipelines that are still relevant running.

```alloy
// declare.dynamic "maps" each target to a sub-pipeline.
// Each sub-pipeline has 1 exporter, 1 relabel, and 1 scraper.
// Internally, maybe one way this can be done via serializing the pipeline to a string and then importing it as a module?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's an interesting idea. One way to do this could be similar to the import node. The declare.dynamic instance would manage a list of custom component nodes that it can run using the runner package. These custom component nodes would not be part of the initial graph and would take the pipeline template as AST body to run.

@captncraig
Copy link
Contributor

I really like the idea of it being component based. I am comparing to terraform's for_each, where you make a "template" resource and have an argument in there which tells it how to expand it into multiple components:

resource "azurerm_resource_group" "rg" {
  for_each = tomap({
    a_group       = "eastus"
    another_group = "westus2"
  })
  name     = each.key
  location = each.value
}
  • for_each can be any expression that evaluates to something iterable (in our case limiting to an array would probably be fine, but maybe we could do maps in a similar way).
  • The each keyword is valid in other arguments to reference the "current" item for iteration.

I really like the notion that this is a single component that would be "expanded" based on the evaluation of some list. It could be a built-in component or a previously imported dynamic component.

I am not sure how naming of the expanded components would work, but we could figure something out. I'm also not sure if it is possible to reference a dynamically created subcomponent from elsewhere in the config.


// Every component defined in the "foreach" block will be instantiated for each item in the collection.
// The instantiated components will be scoped using the name of the foreach block and the index of the
// item in the collection. For example: /foreach.redis/0/prometheus.exporter.redis.default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think targets going away in the middle of the list is a good point to consider. Hashing the way that William is proposing might work and we'd need to recommend people to try change the input targets as little as possible (e.g. use a reasonable refresh interval), otherwise there will be a lot of churn.

* The `foreach` name is consistent with other programming languages.

Cons:
* It looks less like a component than a `declare.dynamic` block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a problem, you can still define a declare component (or import one) and use it inside the foreach syntax to make it smaller, add the parameters you need and reuse it for multiple foreach blocks.


[alloy-types]: https://grafana.com/docs/alloy/latest/get-started/configuration-syntax/expressions/types_and_values/

## Proposal 1: A foreach block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this one IMO.


We are proposing a new feature to the [Alloy standard library][stdlib].
It will be similar to a `map` operation over a collection such as a `list()`.
Each `map` transformation will be done by a chain of components (a "sub-pipeline") created for this transformation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not a fan of double-quoted quasi-concept of "sub-pipeline"... it's not essential to this proposal, but coming up with a better way to explain it using well-defined concepts is something we'll need to do for the documentation of this feature.

We should find answers to the unknowns below before this proposal is accepted:

* Will the solution only work for `list()`? What about `map()`?
* If we go with a `foreach`, we could have a `key` attribute in addition to the `var` one. That way we can also access the key. The `key` attribute can be a no-op if `collection` is a map?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, a map of {k1: v1, k2: v2} can be translated into list of entries: [{key: k1, value: v1}, {key: k2, value: v2}].

* Will the solution only work for `list()`? What about `map()`?
* If we go with a `foreach`, we could have a `key` attribute in addition to the `var` one. That way we can also access the key. The `key` attribute can be a no-op if `collection` is a map?
* What about debug metrics? Should we aggregate the metrics for all "sub-pipelines"?
* If there is 1 series for each sub-pipeline, the amount of metrics could be huge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explosion of metrics is an existing issue with Alloy and using modules...
Also, currently, if someone wanted to implement the use-case of, e.g. 10 redis exporters, and they chose to use modules, they would also get a ton of metrics AFAIU.

So I don't think it's a problem that should be part of this proposal. This is not to say that I don't think it's a problem, we should ideally look into how we can handle metrics for modules a bit better. But IMO this should be a new proposal that will deal with this problem for all usages of modules.

* Can we have a configuration parameter which dictates whether the metrics should be aggregated or not?
* Do we have to recreate the sub-pipelines every time a new collection is received,
even if the new collection has the same number of elements?
* Do we need to have more than one output, of a different type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean... in Proposal 1 you can output to as many things as you want, AFAICT

* Do we have to recreate the sub-pipelines every time a new collection is received,
even if the new collection has the same number of elements?
* Do we need to have more than one output, of a different type?
* Do we need to have more than one input, of a different type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO no. You can use multiple foreach or perhaps merge your targets using something like the join in your other proposal?

Copy link
Contributor

github-actions bot commented Oct 7, 2024

This PR has not had any activity in the past 30 days, so the needs-attention label has been added to it.
If you do not have enough time to follow up on this PR or you think it's no longer relevant, consider closing it.
The needs-attention label signals to maintainers that something has fallen through the cracks. No action is needed by you; your PR will be kept open and you do not have to respond to this comment. The label will be removed the next time this job runs if there is new activity.
Thank you for your contributions!

@ptodev
Copy link
Contributor Author

ptodev commented Oct 29, 2024

It looks likely that we are going to go with the foreach option. I updated the PR to add some draft code that I've been working on. As a first step, it'd be good to have a unit tests that doesn't use the proposed var argument. The test could look something like what's proposed in internal/runtime/testdata/foreach/foreach_1.txtar.

As a second step, we will need to figure out how to refer to the values that are currently iterated on. Should we actually have a var argument? I think it might be unnecessary - having a predefined argument (e.g. val) which refers to the current thing being iterated on may be enough. I'm not yet sure how to add this reserved word into the Alloy syntax though.

By the way, apparently the Collector already has something like dynamic pipelines. There are observer extensions which you can hook up to a recover creator component. For example, you can have a k8s observer which crates a receiver for each discovered pod.

@ptodev ptodev force-pushed the ptodev/dynamic-pipelines branch from dd602e6 to 378cb04 Compare October 31, 2024 19:35
@ptodev ptodev force-pushed the ptodev/dynamic-pipelines branch from 378cb04 to cc02bfc Compare November 20, 2024 09:43
@ptodev ptodev force-pushed the ptodev/dynamic-pipelines branch from d8b2547 to d1591b1 Compare December 16, 2024 10:37
@wildum wildum force-pushed the ptodev/dynamic-pipelines branch from e8cd817 to 8217e86 Compare December 19, 2024 14:40
@wildum wildum force-pushed the ptodev/dynamic-pipelines branch from 57e22d1 to b041704 Compare January 8, 2025 16:14
@wildum wildum force-pushed the ptodev/dynamic-pipelines branch from b041704 to 421204a Compare January 9, 2025 09:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants